{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "pknVo1kM2wI2"
      },
      "source": [
        "##### Copyright 2021 The TensorFlow Authors."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "SoFqANDE222Y"
      },
      "outputs": [],
      "source": [
        "#@title Licensed under the Apache License, Version 2.0 (the \"License\");\n",
        "# you may not use this file except in compliance with the License.\n",
        "# You may obtain a copy of the License at\n",
        "#\n",
        "# https://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing, software\n",
        "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
        "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
        "# See the License for the specific language governing permissions and\n",
        "# limitations under the License."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "6x1ypzczQCwy"
      },
      "source": [
        "# Simple TFX Pipeline for Vertex Pipelines\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_445qeKq8e3-"
      },
      "source": [
        "<div class=\"devsite-table-wrapper\"><table class=\"tfo-notebook-buttons\" align=\"left\">\n",
        "<td><a target=\"_blank\" href=\"https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_simple\">\n",
        "<img src=\"https://www.tensorflow.org/images/tf_logo_32px.png\"/>View on TensorFlow.org</a></td>\n",
        "<td><a target=\"_blank\" href=\"https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb\">\n",
        "<img src=\"https://www.tensorflow.org/images/colab_logo_32px.png\">Run in Google Colab</a></td>\n",
        "<td><a target=\"_blank\" href=\"https://github.com/tensorflow/tfx/tree/master/docs/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb\">\n",
        "<img width=32px src=\"https://www.tensorflow.org/images/GitHub-Mark-32px.png\">View source on GitHub</a></td>\n",
        "<td><a href=\"https://storage.googleapis.com/tensorflow_docs/tfx/docs/tutorials/tfx/gcp/vertex_pipelines_simple.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\" />Download notebook</a></td>\n",
        "<td><a href=\"https://console.cloud.google.com/mlengine/notebooks/deploy-notebook?q=download_url%3Dhttps%253A%252F%252Fraw.githubusercontent.com%252Ftensorflow%252Ftfx%252Fmaster%252Fdocs%252Ftutorials%252Ftfx%252Fgcp%252Fvertex_pipelines_simple.ipynb\"><img src=\"https://www.tensorflow.org/images/download_logo_32px.png\" />Run in Google Cloud AI Platform Notebook</a></td>\n",
        "</table></div>\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_VuwrlnvQJ5k"
      },
      "source": [
        "This notebook-based tutorial will create a simple TFX pipeline and run it using\n",
        "Google Cloud Vertex Pipelines.  This notebook is based on the TFX pipeline\n",
        "we built in\n",
        "[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).\n",
        "If you are not familiar with TFX and you have not read that tutorial yet, you\n",
        "should read it before proceeding with this notebook.\n",
        "\n",
        "Google Cloud Vertex Pipelines helps you to automate, monitor, and govern\n",
        "your ML systems by orchestrating your ML workflow in a serverless manner. You\n",
        "can define your ML pipelines using Python with TFX, and then execute your\n",
        "pipelines on Google Cloud. See\n",
        "[Vertex Pipelines introduction](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)\n",
        "to learn more about Vertex Pipelines."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "x4U5gp15QJ2b"
      },
      "source": [
        "This notebook is intended to be run on\n",
        "[Google Colab](https://colab.research.google.com/notebooks/intro.ipynb) or on\n",
        "[AI Platform Notebooks](https://cloud.google.com/ai-platform-notebooks). If you\n",
        "are not using one of these, you can simply click \"Run in Goolge Colab\" button\n",
        "above.\n",
        "\n",
        "## Set up\n",
        "Before you run this notebook, ensure that you have following:\n",
        "- A [Google Cloud Platform](http://cloud.google.com/) project.\n",
        "- A [Google Cloud Storage](https://cloud.google.com/storage) bucket. See\n",
        "[the guide for creating buckets](https://cloud.google.com/storage/docs/creating-buckets).\n",
        "- Enable\n",
        "[Vertex AI and Cloud Storage API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com,storage-component.googleapis.com).\n",
        "\n",
        "Please see\n",
        "[Vertex documentation](https://cloud.google.com/vertex-ai/docs/pipelines/configure-project)\n",
        "to configure your GCP project further."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fwZ0aXisoBFW"
      },
      "source": [
        "### Install python packages"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WC9W_S-bONgl"
      },
      "source": [
        "We will install required Python packages including TFX and KFP to author ML\n",
        "pipelines and submit jobs to Vertex Pipelines."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "iyQtljP-qPHY"
      },
      "outputs": [],
      "source": [
        "# Use the latest version of pip.\n",
        "!pip install --upgrade pip\n",
        "!pip install --upgrade tfx==0.30.0 kfp==1.6.1"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "EwT0nov5QO1M"
      },
      "source": [
        "#### Did you restart the runtime?\n",
        "\n",
        "If you are using Google Colab, the first time that you run\n",
        "the cell above, you must restart the runtime by clicking\n",
        "above \"RESTART RUNTIME\" button or using \"Runtime > Restart\n",
        "runtime ...\" menu. This is because of the way that Colab\n",
        "loads packages."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-CRyIL4LVDlQ"
      },
      "source": [
        "If you are not on Colab, you can restart runtime with following cell."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "KHTSzMygoBF6"
      },
      "outputs": [],
      "source": [
        "# docs_infra: no_execute\n",
        "import sys\n",
        "if not 'google.colab' in sys.modules:\n",
        "  # Automatically restart kernel after installs\n",
        "  import IPython\n",
        "  app = IPython.Application.instance()\n",
        "  app.kernel.do_shutdown(True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gckGHdW9iPrq"
      },
      "source": [
        "### Login in to Google for this notebook\n",
        "If you are running this notebook on Colab, authenticate with your user account:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "kZQA0KrfXCvU"
      },
      "outputs": [],
      "source": [
        "import sys\n",
        "if 'google.colab' in sys.modules:\n",
        "  from google.colab import auth\n",
        "  auth.authenticate_user()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "aaqJjbmk6o0o"
      },
      "source": [
        "**If you are on AI Platform Notebooks**, authenticate with Google Cloud before\n",
        "running the next section, by running\n",
        "```sh\n",
        "gcloud auth login\n",
        "```\n",
        "**in the Terminal window** (which you can open via **File** > **New** in the\n",
        "menu). You only need to do this once per notebook instance."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3_SveIKxaENu"
      },
      "source": [
        "Check the package versions."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Xd-iP9wEaENu"
      },
      "outputs": [],
      "source": [
        "import tensorflow as tf\n",
        "print('TensorFlow version: {}'.format(tf.__version__))\n",
        "from tfx import v1 as tfx\n",
        "print('TFX version: {}'.format(tfx.__version__))\n",
        "import kfp\n",
        "print('KFP version: {}'.format(kfp.__version__))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "aDtLdSkvqPHe"
      },
      "source": [
        "### Set up variables\n",
        "\n",
        "We will set up some variables used to customize the pipelines below. Following\n",
        "information is required:\n",
        "\n",
        "* GCP Project id. See\n",
        "[Identifying your project id](https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects).\n",
        "* GCP Region to run pipelines. For more information about the regions that\n",
        "Vertex Pipelines is available in, see the\n",
        "[Vertex AI locations guide](https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability).\n",
        "* Google Cloud Storage Bucket to store pipeline outputs.\n",
        "\n",
        "**Enter required values in the cell below before running it**.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "EcUseqJaE2XN"
      },
      "outputs": [],
      "source": [
        "GOOGLE_CLOUD_PROJECT = ''     # <--- ENTER THIS\n",
        "GOOGLE_CLOUD_REGION = ''      # <--- ENTER THIS\n",
        "GCS_BUCKET_NAME = ''          # <--- ENTER THIS\n",
        "\n",
        "if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):\n",
        "    from absl import logging\n",
        "    logging.error('Please set all required parameters.')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "GAaCPLjgiJrO"
      },
      "source": [
        "Set `gcloud` to use your project."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "VkWdxe4TXRHk"
      },
      "outputs": [],
      "source": [
        "!gcloud config set project {GOOGLE_CLOUD_PROJECT}"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "CPN6UL5CazNy"
      },
      "outputs": [],
      "source": [
        "PIPELINE_NAME = 'penguin-vertex-pipelines'\n",
        "\n",
        "# Path to various pipeline artifact.\n",
        "PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(\n",
        "    GCS_BUCKET_NAME, PIPELINE_NAME)\n",
        "\n",
        "# Paths for users' Python module.\n",
        "MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(\n",
        "    GCS_BUCKET_NAME, PIPELINE_NAME)\n",
        "\n",
        "# Paths for input data.\n",
        "DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)\n",
        "\n",
        "# This is the path where your model will be pushed for serving.\n",
        "SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(\n",
        "    GCS_BUCKET_NAME, PIPELINE_NAME)\n",
        "\n",
        "print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8F2SRwRLSYGa"
      },
      "source": [
        "### Prepare example data\n",
        "We will use the same\n",
        "[Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html)\n",
        "as\n",
        "[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).\n",
        "\n",
        "There are four numeric features in this dataset which were already normalized\n",
        "to have range [0,1]. We will build a classification model which predicts the\n",
        "`species` of penguins."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "11J7XiCq6AFP"
      },
      "source": [
        "We need to make our own copy of the dataset. Because TFX ExampleGen reads\n",
        "inputs from a directory, we need to create a directory and copy dataset to it\n",
        "on GCS."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "4fxMs6u86acP"
      },
      "outputs": [],
      "source": [
        "!gsutil cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ASpoNmxKSQjI"
      },
      "source": [
        "Take a quick look at the CSV file."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "-eSz28UDSnlG"
      },
      "outputs": [],
      "source": [
        "!gsutil cat {DATA_ROOT}/penguins_processed.csv | head"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "nH6gizcpSwWV"
      },
      "source": [
        "## Create a pipeline\n",
        "\n",
        "TFX pipelines are defined using Python APIs. We will define a pipeline which\n",
        "consists of three components, CsvExampleGen, Trainer and Pusher. The pipeline\n",
        "and model definition is almost the same as\n",
        "[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).\n",
        "\n",
        "The only difference is that we don't need to set `metadata_connection_config`\n",
        "which is used to locate\n",
        "[ML Metadata](https://www.tensorflow.org/tfx/guide/mlmd) database. Because\n",
        "Vertex Pipelines uses a managed metadata service, users don't need to care\n",
        "of it, and we don't need to specify the parameter.\n",
        "\n",
        "Before actually define the pipeline, we need to write a model code for the\n",
        "Trainer component first."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "lOjDv93eS5xV"
      },
      "source": [
        "### Write model code.\n",
        "\n",
        "We will use the same model code as in the\n",
        "[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "aES7Hv5QTDK3"
      },
      "outputs": [],
      "source": [
        "_trainer_module_file = 'penguin_trainer.py'"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "Gnc67uQNTDfW"
      },
      "outputs": [],
      "source": [
        "%%writefile {_trainer_module_file}\n",
        "\n",
        "# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple\n",
        "\n",
        "from typing import List\n",
        "from absl import logging\n",
        "import tensorflow as tf\n",
        "from tensorflow import keras\n",
        "from tensorflow_transform.tf_metadata import schema_utils\n",
        "\n",
        "\n",
        "from tfx import v1 as tfx\n",
        "from tfx_bsl.public import tfxio\n",
        "\n",
        "from tensorflow_metadata.proto.v0 import schema_pb2\n",
        "\n",
        "_FEATURE_KEYS = [\n",
        "    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'\n",
        "]\n",
        "_LABEL_KEY = 'species'\n",
        "\n",
        "_TRAIN_BATCH_SIZE = 20\n",
        "_EVAL_BATCH_SIZE = 10\n",
        "\n",
        "# Since we're not generating or creating a schema, we will instead create\n",
        "# a feature spec.  Since there are a fairly small number of features this is\n",
        "# manageable for this dataset.\n",
        "_FEATURE_SPEC = {\n",
        "    **{\n",
        "        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)\n",
        "           for feature in _FEATURE_KEYS\n",
        "       },\n",
        "    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)\n",
        "}\n",
        "\n",
        "\n",
        "def _input_fn(file_pattern: List[str],\n",
        "              data_accessor: tfx.components.DataAccessor,\n",
        "              schema: schema_pb2.Schema,\n",
        "              batch_size: int) -> tf.data.Dataset:\n",
        "  \"\"\"Generates features and label for training.\n",
        "\n",
        "  Args:\n",
        "    file_pattern: List of paths or patterns of input tfrecord files.\n",
        "    data_accessor: DataAccessor for converting input to RecordBatch.\n",
        "    schema: schema of the input data.\n",
        "    batch_size: representing the number of consecutive elements of returned\n",
        "      dataset to combine in a single batch\n",
        "\n",
        "  Returns:\n",
        "    A dataset that contains (features, indices) tuple where features is a\n",
        "      dictionary of Tensors, and indices is a single Tensor of label indices.\n",
        "  \"\"\"\n",
        "  return data_accessor.tf_dataset_factory(\n",
        "      file_pattern,\n",
        "      tfxio.TensorFlowDatasetOptions(\n",
        "          batch_size=batch_size, label_key=_LABEL_KEY),\n",
        "      schema=schema).repeat()\n",
        "\n",
        "\n",
        "def _make_keras_model() -> tf.keras.Model:\n",
        "  \"\"\"Creates a DNN Keras model for classifying penguin data.\n",
        "\n",
        "  Returns:\n",
        "    A Keras Model.\n",
        "  \"\"\"\n",
        "  # The model below is built with Functional API, please refer to\n",
        "  # https://www.tensorflow.org/guide/keras/overview for all API options.\n",
        "  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]\n",
        "  d = keras.layers.concatenate(inputs)\n",
        "  for _ in range(2):\n",
        "    d = keras.layers.Dense(8, activation='relu')(d)\n",
        "  outputs = keras.layers.Dense(3)(d)\n",
        "\n",
        "  model = keras.Model(inputs=inputs, outputs=outputs)\n",
        "  model.compile(\n",
        "      optimizer=keras.optimizers.Adam(1e-2),\n",
        "      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n",
        "      metrics=[keras.metrics.SparseCategoricalAccuracy()])\n",
        "\n",
        "  model.summary(print_fn=logging.info)\n",
        "  return model\n",
        "\n",
        "\n",
        "# TFX Trainer will call this function.\n",
        "def run_fn(fn_args: tfx.components.FnArgs):\n",
        "  \"\"\"Train the model based on given args.\n",
        "\n",
        "  Args:\n",
        "    fn_args: Holds args used to train the model as name/value pairs.\n",
        "  \"\"\"\n",
        "\n",
        "  # This schema is usually either an output of SchemaGen or a manually-curated\n",
        "  # version provided by pipeline author. A schema can also derived from TFT\n",
        "  # graph if a Transform component is used. In the case when either is missing,\n",
        "  # `schema_from_feature_spec` could be used to generate schema from very simple\n",
        "  # feature_spec, but the schema returned would be very primitive.\n",
        "  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)\n",
        "\n",
        "  train_dataset = _input_fn(\n",
        "      fn_args.train_files,\n",
        "      fn_args.data_accessor,\n",
        "      schema,\n",
        "      batch_size=_TRAIN_BATCH_SIZE)\n",
        "  eval_dataset = _input_fn(\n",
        "      fn_args.eval_files,\n",
        "      fn_args.data_accessor,\n",
        "      schema,\n",
        "      batch_size=_EVAL_BATCH_SIZE)\n",
        "\n",
        "  model = _make_keras_model()\n",
        "  model.fit(\n",
        "      train_dataset,\n",
        "      steps_per_epoch=fn_args.train_steps,\n",
        "      validation_data=eval_dataset,\n",
        "      validation_steps=fn_args.eval_steps)\n",
        "\n",
        "  # The result of the training should be saved in `fn_args.serving_model_dir`\n",
        "  # directory.\n",
        "  model.save(fn_args.serving_model_dir, save_format='tf')"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "-LsYx8MpYvPv"
      },
      "source": [
        "Copy the module file to GCS which can be accessed from the pipeline components.\n",
        "Because model training happens on GCP, we need to upload this model definition. \n",
        "\n",
        "Otherwise, you might want to build a container image including the module file\n",
        "and use the image to run the pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "rMMs5wuNYAbc"
      },
      "outputs": [],
      "source": [
        "!gsutil cp {_trainer_module_file} {MODULE_ROOT}/"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "w3OkNz3gTLwM"
      },
      "source": [
        "### Write a pipeline definition\n",
        "\n",
        "We will define a function to create a TFX pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "M49yYVNBTPd4"
      },
      "outputs": [],
      "source": [
        "# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and\n",
        "# slightly modified because we don't need `metadata_path` argument.\n",
        "\n",
        "def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,\n",
        "                     module_file: str, serving_model_dir: str,\n",
        "                     ) -> tfx.dsl.Pipeline:\n",
        "  \"\"\"Creates a three component penguin pipeline with TFX.\"\"\"\n",
        "  # Brings data into the pipeline.\n",
        "  example_gen = tfx.components.CsvExampleGen(input_base=data_root)\n",
        "\n",
        "  # Uses user-provided Python function that trains a model.\n",
        "  trainer = tfx.components.Trainer(\n",
        "      module_file=module_file,\n",
        "      examples=example_gen.outputs['examples'],\n",
        "      train_args=tfx.proto.TrainArgs(num_steps=100),\n",
        "      eval_args=tfx.proto.EvalArgs(num_steps=5))\n",
        "\n",
        "  # Pushes the model to a filesystem destination.\n",
        "  pusher = tfx.components.Pusher(\n",
        "      model=trainer.outputs['model'],\n",
        "      push_destination=tfx.proto.PushDestination(\n",
        "          filesystem=tfx.proto.PushDestination.Filesystem(\n",
        "              base_directory=serving_model_dir)))\n",
        "\n",
        "  # Following three components will be included in the pipeline.\n",
        "  components = [\n",
        "      example_gen,\n",
        "      trainer,\n",
        "      pusher,\n",
        "  ]\n",
        "\n",
        "  return tfx.dsl.Pipeline(\n",
        "      pipeline_name=pipeline_name,\n",
        "      pipeline_root=pipeline_root,\n",
        "      components=components)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mJbq07THU2GV"
      },
      "source": [
        "## Run the pipeline on Vertex Pipelines.\n",
        "\n",
        "We used `LocalDagRunner` which runs on local environment in\n",
        "[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).\n",
        "TFX provides multiple orchestrators to run your pipeline. In this tutorial we\n",
        "will use the Vertex Pipelines together with the Kubeflow V2 dag runner."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "7mp0AkmrPdUb"
      },
      "source": [
        "We need to define a runner to actually run the pipeline. You will compile\n",
        "your pipeline into our pipeline definition format using TFX APIs."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "fAtfOZTYWJu-"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "\n",
        "PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'\n",
        "\n",
        "runner = tfx.orchestration.experimental.KubeflowV2DagRunner(\n",
        "    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),\n",
        "    output_filename=PIPELINE_DEFINITION_FILE)\n",
        "# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.\n",
        "_ = runner.run(\n",
        "    _create_pipeline(\n",
        "        pipeline_name=PIPELINE_NAME,\n",
        "        pipeline_root=PIPELINE_ROOT,\n",
        "        data_root=DATA_ROOT,\n",
        "        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),\n",
        "        serving_model_dir=SERVING_MODEL_DIR))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "fWyITYSDd8w4"
      },
      "source": [
        "The generated definition file can be submitted using kfp client."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "tI71jlEvWMV7"
      },
      "outputs": [],
      "source": [
        "# docs_infra: no_execute\n",
        "from kfp.v2.google import client\n",
        "\n",
        "pipelines_client = client.AIPlatformClient(\n",
        "    project_id=GOOGLE_CLOUD_PROJECT,\n",
        "    region=GOOGLE_CLOUD_REGION,\n",
        ")\n",
        "\n",
        "_ = pipelines_client.create_run_from_job_spec(PIPELINE_DEFINITION_FILE)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "L3k9f5IVQXcQ"
      },
      "source": [
        "Now you can visit the link in the output above or visit\n",
        "'Vertex AI > Pipelines' in\n",
        "[Google Cloud Console](https://console.cloud.google.com/) to see the\n",
        "progress."
      ]
    }
  ],
  "metadata": {
    "colab": {
      "collapsed_sections": [
        "pknVo1kM2wI2"
      ],
      "name": "vertex_pipelines_simple.ipynb",
      "toc_visible": true
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
